89033dde5000158a79d498d7ae091a946bccce89,bridge/src/main/java/io/rhiot/kafka/bridge/SinkBridgeEndpoint.java,SinkBridgeEndpoint,handle,#ProtonLink#,123

Before Change


			props.put(BridgeConfig.AUTO_OFFSET_RESET, "earliest");
			
			// create and start new thread for reading from Kafka
			this.kafkaConsumerRunner = new KafkaConsumerRunner<>(props, 
					topic, (Integer)partition, (Long)offset, 
					this.vertx, this.ebName, 
					sender.getQoS(), this.offsetTracker);
			
			this.kafkaConsumerThread = new Thread(kafkaConsumerRunner);
			this.kafkaConsumerThread.start();

After Change


			config.put(KafkaConsumerWorker.KAFKA_CONSUMER_TOPIC, topic);
			config.put(KafkaConsumerWorker.KAFKA_CONSUMER_PARTITION, (Integer)partition);
			config.put(KafkaConsumerWorker.KAFKA_CONSUMER_OFFSET, (Long)offset);
			config.put(KafkaConsumerWorker.KAFKA_CONSUMER_EBQUEUE, this.ebName);
			config.put(KafkaConsumerWorker.KAFKA_CONSUMER_QOS, sender.getQoS());
						
			this.kafkaConsumerWorker = new KafkaConsumerWorker<>();
			this.kafkaConsumerWorker.setOffsetTracker(this.offsetTracker);
			DeploymentOptions options = new DeploymentOptions().setConfig(config);
			this.vertx.deployVerticle(this.kafkaConsumerWorker, options);
			
			// message sending on AMQP link MUST happen on Vert.x event loop due to
			// the access to the sender object provided by Vert.x handler